home
***
CD-ROM
|
disk
|
FTP
|
other
***
search
/
AMIGA-CD 2
/
Amiga-CD - Volume 2.iso
/
gepackte_disketten
/
1994
/
08_94_5.dms
/
08_94_5.adf
/
term-4.0-Source.lha
/
termMsgQueue.c
< prev
next >
Wrap
C/C++ Source or Header
|
1994-06-16
|
7KB
|
398 lines
/*
** termMsgQueue.c
**
** Special MsgPort like queue management
**
** Copyright © 1990-1994 by Olaf `Olsen' Barthel
** All Rights Reserved
*/
#include "termGlobal.h"
/* DestroyPooled(struct MsgItem *Item):
*
* Default destructor for CreateMsgItem().
*/
STATIC VOID __stdargs
DestroyPooled(struct MsgItem *Item)
{
FreeVecPooled(Item);
}
/* GetMsgItem(struct MsgQueue *Queue):
*
* Get the next message from the queue handle,
* similar to GetMsg().
*/
APTR __regargs
GetMsgItem(struct MsgQueue *Queue)
{
// Valid handle?
if(Queue)
{
APTR Item;
// Gain access
ObtainSemaphore(&Queue -> Access);
// Any item available?
if(Queue -> MsgList . mlh_Head -> mln_Succ)
{
// Remove the first item
Remove((struct Node *)(Item = Queue -> MsgList . mlh_Head));
// Are there any tasks waiting for the queue to become smaller?
if(Queue -> WaitList . mlh_Head -> mln_Succ)
{
struct SemaphoreRequest *WaitRequest = (struct SemaphoreRequest *)Queue -> WaitList . mlh_Head;
// Remove one waiting task from the list
Remove((struct Node *)WaitRequest);
// Wake the task up
Signal(WaitRequest -> sr_Waiter,SIGF_SINGLE);
}
// One message taken
if(Queue -> QueueSize)
Queue -> QueueSize--;
}
else
Item = NULL;
// Drop the semaphore
ReleaseSemaphore(&Queue -> Access);
// Return the item read
return(Item);
}
else
return(NULL);
}
/* PutMsgItem(struct MsgQueue *Queue,struct MsgItem *Item):
*
* Add a message item to a queue handle, similar to
* PutMsg().
*/
VOID __regargs
PutMsgItem(struct MsgQueue *Queue,struct MsgItem *Item)
{
// Valid data?
if(Queue && Item)
{
// Gain access to the handle
ObtainSemaphore(&Queue -> Access);
// Are we to discard this message?
if(Queue -> Discard)
{
ReleaseSemaphore(&Queue -> Access);
DeleteMsgItem(Item);
}
else
{
// Are we to watch for a maximum queue size?
if(Queue -> MaxSize)
{
// Maximum queue size already reached?
if(Queue -> MaxSize == Queue -> QueueSize)
{
struct SemaphoreRequest WaitRequest;
// That's me
WaitRequest . sr_Waiter = FindTask(NULL);
// Add this task to the waiting list
AddTail((struct List *)&Queue -> WaitList,(struct Node *)&WaitRequest);
// Careful, please
Forbid();
// Drop the semaphore
ReleaseSemaphore(&Queue -> Access);
// Clear the one-shot flag
ClrSignal(SIGF_SINGLE);
// Wait for the queue to become smaller
Wait(SIGF_SINGLE);
// Gain access to the handle
ObtainSemaphore(&Queue -> Access);
// Reenable multitasking
Permit();
}
// We are going to make the queue longer
Queue -> QueueSize++;
}
// Add the item to the list
AddTail((struct List *)&Queue -> MsgList,(struct Node *)Item);
// Wake up the owner
Signal(Queue -> SigTask,Queue -> SigMask);
// Drop the semaphore
ReleaseSemaphore(&Queue -> Access);
}
}
}
/* DeleteMsgItem(struct MsgItem *Item):
*
* Clean up after a message item.
*/
VOID __regargs
DeleteMsgItem(struct MsgItem *Item)
{
// Valid data?
if(Item)
{
// Do we have a destructor for this item?
if(Item -> Destructor)
(*Item -> Destructor)(Item);
}
}
/* CreateMsgItem(LONG Size):
*
* Allocate a new message item.
*/
struct MsgItem * __regargs
CreateMsgItem(LONG Size)
{
struct MsgItem *Item;
// Allocate space for the item and add the default destructor
if(Item = (struct MsgItem *)AllocVecPooled(Size,MEMF_ANY))
Item -> Destructor = DestroyPooled;
return(Item);
}
/* CreateMinMsgItem(LONG Size):
*
* Allocate a new message item, make sure that the header
* is present.
*/
struct MsgItem * __regargs
CreateMinMsgItem(LONG Size)
{
struct MsgItem *Item;
// Allocate space for the item and add the default destructor
if(Item = (struct MsgItem *)AllocVecPooled(sizeof(struct MsgItem) + Size,MEMF_ANY))
Item -> Destructor = DestroyPooled;
return(Item);
}
/* UnlockMsgQueue(struct MsgQueue *Queue):
*
* Wake up all tasks waiting for the queue to become
* smaller. Note: this will change the maximum queue
* size.
*/
VOID __regargs
UnlockMsgQueue(struct MsgQueue *Queue)
{
// Valid data?
if(Queue)
{
struct SemaphoreRequest *WaitRequest;
// Gain access to the handle
ObtainSemaphore(&Queue -> Access);
// Notify all waiting tasks
for(WaitRequest = (struct SemaphoreRequest *)Queue -> WaitList . mlh_Head ; WaitRequest -> sr_Link . mln_Succ ; WaitRequest = (struct SemaphoreRequest *)WaitRequest -> sr_Link . mln_Succ)
Signal(WaitRequest -> sr_Waiter,SIGF_SINGLE);
// Reset the maximum queue size
Queue -> MaxSize = 0;
// Drop the semaphore
ReleaseSemaphore(&Queue -> Access);
}
}
/* DeleteMsgQueue(struct MsgQueue *Queue):
*
* Free a queue handle.
*/
VOID __regargs
DeleteMsgQueue(struct MsgQueue *Queue)
{
// Valid data?
if(Queue)
{
struct MsgItem *Item,*Next;
// Make sure no tasks is waiting in the list
UnlockMsgQueue(Queue);
// Disable multitasking
Forbid();
// Clear the queue signal
ClrSignal(Queue -> SigMask);
// Wait until all tasks are finished
while(Queue -> QueueSize)
Wait(Queue -> SigMask);
// Reenable multitasking
Permit();
// Remove each item from the list
for(Item = (struct MsgItem *)Queue -> MsgList . mlh_Head ; Next = (struct MsgItem *)Item -> Link . mln_Succ ; Item = Next)
DeleteMsgItem(Item);
// Free the signal bit if necessary
if(Queue -> SigBit != -1)
FreeSignal(Queue -> SigBit);
// Free the handle
FreeVecPooled(Queue);
}
}
/* CreateMsgQueue(ULONG SigMask,LONG MaxSize):
*
* Allocate a queue handle.
*/
struct MsgQueue * __regargs
CreateMsgQueue(ULONG SigMask,LONG MaxSize)
{
struct MsgQueue *Queue;
// Allocate the queue handle
if(Queue = (struct MsgQueue *)AllocVecPooled(sizeof(struct MsgQueue),MEMF_ANY | MEMF_PUBLIC | MEMF_CLEAR))
{
// Initialize the access semaphore
InitSemaphore(&Queue -> Access);
// Initialize the message item list
NewList((struct List *)&Queue -> MsgList);
// Reset the size data
Queue -> QueueSize = 0;
Queue -> MaxSize = MaxSize;
// Initialize the queue wait list
NewList((struct List *)&Queue -> WaitList);
// Store the owner address
Queue -> SigTask = FindTask(NULL);
// Do we have a signal mask ready?
if(SigMask)
{
// Use it
Queue -> SigMask = SigMask;
Queue -> SigBit = -1;
}
else
{
// Allocate a new signal
if((Queue -> SigBit = AllocSignal(-1)) == -1)
{
FreeVecPooled(Queue);
return(NULL);
}
else
Queue -> SigMask = (1L << Queue -> SigBit);
}
}
return(Queue);
}
/* SetQueueDiscard(struct MsgQueue *Queue,BOOL Mode):
*
* Set whether new items should be added to the
* queue or whether they should rather get
* discarded instead.
*/
VOID __regargs
SetQueueDiscard(struct MsgQueue *Queue,BOOL Mode)
{
if(Queue)
{
ObtainSemaphore(&Queue -> Access);
Queue -> Discard = Mode;
ReleaseSemaphore(&Queue -> Access);
}
}